﻿#include "service_http_loader.hh"
#include "../../src/repo/src/include/root/rpc_root.h"
#include "../../thirdparty/jsoncpp/include/json/json.h"
#include "../../thirdparty/klogger/klogger/interface/logger.h"
#include "../box/service_box.hh"
#include "../config/box_config.hh"
#include "../detail/lang_impl.hh"
#include "../lang/lang.hh"
#include "../util/object_pool.hh"
#include "../util/os_util.hh"
#include "../util/string_util.hh"
#include "../util/time_util.hh"
#include "../detail/scope_curl.hh"
#include <chrono>
#include <memory>

namespace kratos {
namespace service {

/**
 * 创建版本描述文件
 * \param base 目录
 * \param version 版本号
 * \return true或者false
 */
auto create_version_file(const std::string &base,
  const std::string &version) -> bool {
  std::ofstream ofs;
  ofs.open(util::complete_path(base, version),
    std::ios::out | std::ios::trunc);
  if (!ofs) {
    return false;
  }
  return true;
}

auto create_version_file(const std::string& version_file_path) {
  std::ofstream ofs;
  ofs.open(version_file_path, std::ios::out | std::ios::trunc);
  if (!ofs) {
    return false;
  }
  return true;
}

ServiceHttpLoader::ServiceHttpLoader() {}

ServiceHttpLoader::~ServiceHttpLoader() {
  if (running_) {
    stop();
  }
}

auto ServiceHttpLoader::get_version_file(
  const std::string &version_api,
  BundleInfoVector &bundle_infos,
  std::string &version) -> bool {
  bundle_infos.clear();
  ScopeCurl curl;
  // 获取版本文件
  auto retval = curl.perform(version_api);
  if (!retval) {
    box_->write_log(
      lang::LangID::LANG_HTTP_CONNECT_REMOTE_REPO_FAIL,
      klogger::Logger::WARNING, version_api.c_str()
    );
    return false;
  }
  // 解析版本文件
  Json::CharReaderBuilder builder;
  auto reader = std::unique_ptr<Json::CharReader>(builder.newCharReader());
  Json::Value root;
  auto *content = curl.get_content();
  auto content_size = curl.get_content_size();
  auto ret = reader->parse(content, content + content_size, &root, nullptr);
  if (!ret) {
    // 版本文件解析失败
    box_->write_log(
      lang::LangID::LANG_HTTP_PARSE_REMOTE_VERSION_CONFIG_FAIL,
      klogger::Logger::WARNING, version_api.c_str()
    );
    return false;
  }
  // 检查当前版本文件与最新版本文件是否不同，如果一致则不更新
  if (root.isMember("version") && root.isMember("bundles")) {
    // 获取版本号
    version = root["version"].asString();
    // 填写bundle_infos，这些文件是需要更新的文件
    auto bundle_count = root["bundles"].size();
    for (decltype(bundle_count) i = 0; i < bundle_count; i++) {
      if (!root["bundles"][i].isMember("uuid") ||
          !root["bundles"][i].isMember("name")) {
        box_->write_log(
          lang::LangID::LANG_HTTP_PARSE_REMOTE_VERSION_CONFIG_FAIL,
          klogger::Logger::WARNING, version_api.c_str()
        );
        bundle_infos.clear();
        return false;
      }
      auto bundle_uuid = root["bundles"][i]["uuid"].asString();
      auto bundle_name = root["bundles"][i]["name"].asString();
      bundle_infos.push_back({bundle_uuid, bundle_name});
    }
  }
  return true;
}

auto ServiceHttpLoader::get_bundle_file(const std::string &file_path,
  const std::string &bundle_file_url)
    -> bool {
  ScopeCurl curl;
  if (!curl.perform(bundle_file_url)) {
    box_->write_log(
      lang::LangID::LANG_HTTP_CONNECT_REMOTE_REPO_FAIL,
      klogger::Logger::WARNING, bundle_file_url.c_str()
    );
    return false;
  }
  std::ofstream ofs;
  // Bundle文件为二进制
  ofs.open(file_path, std::ios::out | std::ios::binary | std::ios::trunc);
  if (!ofs) {
    box_->write_log(
      lang::LangID::LANG_HTTP_CREATE_LOCAL_FILE_FAIL,
      klogger::Logger::WARNING,
      bundle_file_url.c_str(),
      file_path.c_str()
    );
    ofs.close();
    return false;
  }
  if (!curl.to_stream(ofs)) {
    box_->write_log(
      lang::LangID::LANG_HTTP_CREATE_LOCAL_FILE_FAIL,
      klogger::Logger::WARNING,
      bundle_file_url.c_str(),
      file_path.c_str()
    );
    ofs.close();
    util::remove_file(file_path);
    return false;
  }
  ofs.close();
  return true;
}

auto ServiceHttpLoader::check_version(const std::string &version_root,
  const std::string &version) -> bool {
  if (!util::is_path_exists(version_root)) {
    if (!util::make_dir(version_root)) {
      box_->write_log(
        lang::LangID::LANG_HTTP_CREATE_LOCAL_DIR_FAIL,
        klogger::Logger::WARNING, version_root.c_str()
      );
      return false;
    } else {
      // 目录建立成功
    }
  } else {
    // 目录已经存在，查看是否已经全部加载过了
    if (util::is_path_exists(version_root + ".done")) {
      // 全部加载过了
      return false;
    } else {
      // 有部分加载失败了,还需要继续加载
    }
  }
  return true;
}

auto ServiceHttpLoader::download_bundle_files(
  const BundleInfoVector &bundle_infos,
  const std::string &version_root,
  const std::string &version,
  const std::string &remote_service_repo_dir) -> bool {
  for (const auto &bundle_info : bundle_infos) {
    auto bundle_local_path =
      util::complete_path(version_root, bundle_info.name);
    if (util::is_path_exists(bundle_local_path)) {
      // 文件已经存在
      continue;
    }
    auto full_url_path = util::complete_path_url(
      remote_service_repo_dir, bundle_info.name + "?version=" + version);
    auto ret = get_bundle_file(bundle_local_path, full_url_path);
    if (!ret) {
      box_->write_log(
        lang::LangID::LANG_HTTP_DOWNLOAD_REMOTE_BUNDLE_FAIL,
        klogger::Logger::WARNING,
        full_url_path.c_str()
      );
      return false;
    }
  }
  return true;
}

auto ServiceHttpLoader::get_info(
  std::string &service_dir,
  std::string &version_api,
  std::string &remote_service_repo_dir,
  std::string &temp_service_dir) -> bool {
  {
    std::lock_guard<std::mutex> guard(mutex_);
    service_dir             = service_dir_;
    version_api             = version_api_;
    remote_service_repo_dir = remote_service_repo_dir_;
    temp_service_dir        = temp_service_dir_;
  }
  if (version_api.empty() || remote_service_repo_dir.empty()) {
    // 未配置
    return false;
  }
  if (!util::is_path_exists(temp_service_dir)) {
    if (!util::make_dir(temp_service_dir)) {
      box_->write_log(
        lang::LangID::LANG_HTTP_CREATE_LOCAL_DIR_FAIL,
        klogger::Logger::WARNING, temp_service_dir.c_str()
      );
      return false;
    }
  }
  return true;
}

auto ServiceHttpLoader::start(ServiceBox *box) -> bool {
  if (!box) {
    return false;
  }
  box_ = box;
  update_config();
  // 添加一个热更新监听器
  box_->get_config().add_reload_listener(
    "ServiceHttpLoader",
    [&](const std::string&, const config::BoxConfig &) {
      std::lock_guard<std::mutex> guard(mutex_);
      // 更新配置
      update_config();
    });
  // 如果开启则在每次服务容器启动的时候强制去下载一次最新版本的服务
  download_lastest_bundles_startup();
  running_ = true;
  start_worker();  
  return true;
}

auto ServiceHttpLoader::update_config()->void {
  // 更新配置
  if (!box_) {
    return;
  }
  auto &config = box_->get_config();

  service_dir_                = config.get_service_dir();
  version_api_                = config.get_remote_service_repo_version_api();
  remote_service_repo_dir_    = config.get_remote_service_repo_dir();
  is_open_remote_update_      = config.is_open_remote_update();
  remote_repo_check_interval_ = config.get_remote_repo_check_interval();

  temp_service_dir_ = util::complete_path(service_dir_, "temp_bundle_dir");
}

auto ServiceHttpLoader::notify_update() -> void {
  if (!worker_only_infos_.empty()) {
    for (const auto& req : worker_only_infos_) {
      worker_to_main_.enqueue(req);
    }
    worker_only_infos_.clear();
  }
}

auto ServiceHttpLoader::start_worker()->void {
  // 开启线程，周期性检测远程服务更新
  libcurl_thread_ = std::thread([&]() {
    std::time_t tick = 0;
    while (running_) {
      if (is_open_remote_update_) {
        check_update_forever(tick);
      } else if ((!is_open_remote_update_) && update_once_) {
        check_update_once();
      } else {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
      }
    }
  });
}

auto ServiceHttpLoader::check_update_forever(std::time_t& tick) -> void {
  std::this_thread::sleep_for(std::chrono::milliseconds(1));
  tick += 1;
  if (tick >= remote_repo_check_interval_) {
    notify_update();
  } else {
    return;
  }
  try {
    do_update();
  } catch (std::exception& ex) {
    box_->write_log(
      lang::LangID::LANG_HTTP_UPDATE_REMOTE_SERVICE_EXCEPT,
      klogger::Logger::WARNING, ex.what()
    );
  }
  tick = 0;
}

auto ServiceHttpLoader::check_update_once() -> void {
  try {
    do_update();
  } catch (std::exception& ex) {
    box_->write_log(
      lang::LangID::LANG_HTTP_UPDATE_REMOTE_SERVICE_EXCEPT,
      klogger::Logger::WARNING, ex.what()
    );
  }
  notify_update();
  update_once_ = false;
}

auto ServiceHttpLoader::do_update() -> void {
  std::string service_dir;
  std::string version_api;
  std::string remote_service_repo_dir;
  std::string temp_service_dir;
  if (!get_info(service_dir, version_api, remote_service_repo_dir,
    temp_service_dir)) {
    return;
  }
  BundleInfoVector bundle_infos;
  std::string version;
  auto retval = get_version_file(version_api, bundle_infos, version);
  if (!retval) {
    // 获取不到版本信息
    return;
  }
  std::string version_root =
      util::complete_path(temp_service_dir, "version" + version);
  auto check_ret = check_version(version_root, version);
  if (!check_ret) {
    // 出错了或者版本已经是最新
    return;
  }
  // 获取版本文件内容
  if (!pull_all(bundle_infos, temp_service_dir, remote_service_repo_dir,
     version_root, version)) {
    return;
  }
}

auto ServiceHttpLoader::finish_update(
  const std::string &version_root,
  const std::string &version,
  const BundleInfoVector &bundle_infos) -> bool {
  // 建立完成文件
  if (!create_version_file(version_root + ".done")) {
    return false;
  }
  for (const auto &bundle_info : bundle_infos) {
    worker_only_infos_.push_back(
      {
        bundle_info.uuid,
        util::complete_path(version_root, bundle_info.name)
      }
    );
  }
  return true;
}

auto ServiceHttpLoader::pull_all(
  const BundleInfoVector &bundle_infos,
  const std::string &temp_service_dir,
  const std::string &remote_service_repo_dir,
  const std::string &version_root,
  const std::string &version) -> bool {
  bool all_success = download_bundle_files(bundle_infos, version_root, version,
    remote_service_repo_dir);
  if (!all_success) {
    // 失败的还需下次再下载
    return false;
  }
  return finish_update(version_root, version, bundle_infos);
}

auto ServiceHttpLoader::force_update() -> void {
  update_once_ = true;
}

auto ServiceHttpLoader::stop() -> bool {
  running_ = false;
  if (libcurl_thread_.joinable()) {
    libcurl_thread_.join();
  }
  if (box_) {
    box_->get_config().remove_reload_listener("ServiceHttpLoader");
  }
  // 工作线程已经关闭，释放所有更新下载进来的服务，但不包含启动时下载的最新版本
  for (const auto &info : main_only_infos_) {
    rpc::unloadClassUnsafe(box_->get_rpc(), info.uuid);
  }
  worker_only_infos_.clear();
  return true;
}

auto ServiceHttpLoader::update() -> void {
  BundleInfo bundle_info;
  while (worker_to_main_.try_dequeue(bundle_info)) {
    if (!rpc::loadClass(box_->get_rpc(), bundle_info.uuid, bundle_info.name,
       true, box_->get_context())) {
      box_->write_log(
        lang::LangID::LANG_HTTP_UPDATE_SERVICE_FAIL,
        klogger::Logger::WARNING, bundle_info.name.c_str()
      );
    } else {
      for (const auto &info : main_only_infos_) {
        if (info.uuid == bundle_info.uuid) {
          return;
        }
      }
      main_only_infos_.push_back(bundle_info);
    }
  }
}

auto ServiceHttpLoader::update_once()->std::size_t {
  std::size_t count = 0;
  BundleInfo bundle_info;
  while (worker_to_main_.try_dequeue(bundle_info)) {
    if (!rpc::loadClass(box_->get_rpc(), bundle_info.uuid, bundle_info.name,
      true, box_->get_context())) {
      box_->write_log(
        lang::LangID::LANG_HTTP_UPDATE_SERVICE_FAIL,
        klogger::Logger::WARNING, bundle_info.name.c_str()
      );
    } else {
      // 重新注册UUID
      box_->register_service(bundle_info.uuid);
    }
    count += 1;
  }
  return count;
}

auto ServiceHttpLoader::check_version_exists(service::ServiceBox* box,
  const std::string& version) -> bool {
  auto tmp_bundle_dir =
    util::complete_path(box->get_config().get_service_dir(),
      "temp_bundle_dir");
  std::string version_root =
    util::complete_path(tmp_bundle_dir, "version" + version);
  return util::is_path_exists(version_root + ".done");
}

auto ServiceHttpLoader::download_lastest_bundles_startup() -> bool {
  if (!box_) {
    return false;
  }
  if (!is_open_remote_update_) {
    return true;
  }
  const auto &remote_service_repo_latest_version_api =
    box_->get_config().get_remote_service_repo_latest_version_api();
  const auto &version_api =
    box_->get_config().get_remote_service_repo_version_api();
  if (remote_service_repo_latest_version_api.empty() || version_api.empty()) {
    return false;
  }
  const auto &service_dir = box_->get_config().get_service_dir();
  const auto &remote_service_repo_dir =
    box_->get_config().get_remote_service_repo_dir();
  BundleInfoVector bundle_infos;
  std::string version;
  auto retval = get_version_file(remote_service_repo_latest_version_api,
    bundle_infos, version);
  if (!retval) {
    box_->write_log(
      lang::LangID::LANG_HTTP_CONNECT_REMOTE_REPO_FAIL,
      klogger::Logger::WARNING,
      remote_service_repo_latest_version_api.c_str());
    return false;
  }
  // 下载所有需要更新的bundle文件
  bool all_success = true;
  for (const auto &bundle_info : bundle_infos) {
    auto full_url_path = util::complete_path_url(
      remote_service_repo_dir, bundle_info.name + "?version=latest");
    auto ret = get_bundle_file(
      util::complete_path(service_dir, bundle_info.name), full_url_path);
    if (!ret) {
      box_->write_log(
        lang::LangID::LANG_HTTP_DOWNLOAD_REMOTE_BUNDLE_FAIL,
        klogger::Logger::WARNING,
        full_url_path.c_str()
      );
      all_success = false;
    }
  }
  if (all_success) {
    // 所有最新版本下载成功
    if (!create_version_file(temp_service_dir_, version + ".done")) {
      box_->write_log(
        lang::LangID::LANG_HTTP_CREATE_LOCAL_VERSION_FILE_FAIL,
        klogger::Logger::WARNING,
        version.c_str()
      );
    }
  }
  return true;
}

} // namespace service
} // namespace kratos
